-
Notifications
You must be signed in to change notification settings - Fork 475
feat(metadata): extract stream range index by lazy load StreamSetObject #2710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces lazy loading of StreamSetObject metadata with range indexing to optimize object retrieval performance. It adds a new indexing system that helps efficiently search for objects based on stream IDs and offsets.
- Adds
StreamSetObjectRangeIndex
for mapping (streamId, startOffset) → objectId relationships - Implements
StreamIdBloomFilter
to optimize stream presence checks in stream set objects - Introduces incremental loading of stream set objects (5 at a time) with preloading strategies
Reviewed Changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
StreamSetObjectRangeIndex.java | New index class providing lazy loading and caching of stream set object range mappings |
S3StreamsMetadataImage.java | Updated getObjects implementation with incremental loading, preloading, and comprehensive debug context |
S3StreamsMetadataImageTest.java | Updated test interface methods to match new RangeGetter signature |
S3StreamsMetadataImageTest.java (core) | New comprehensive test suite for validating getObjects behavior with generated metadata |
StreamMetadataManager.java | Added bloom filter implementation and updated range getter with indexing support |
Comments suppressed due to low confidence (1)
metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java:445
- The calculation 'newStartOffset + r.nextLong(0, newStartOffset)' will result in endOffset being between newStartOffset and 2*newStartOffset, but this seems incorrect as endOffset should typically be greater than startOffset. The bound should likely be a fixed range or use a different calculation.
if (objects.size() > ctx.limit) {
public static final ExecutorService UPDATE_INDEX_THREAD_POOL = Executors.newSingleThreadExecutor( | ||
ThreadUtils.createThreadFactory("StreamSetObjectRangeIndex", true)); | ||
|
||
private static final Object DUMMAY_OBJECT = new Object(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constant name has a typo: 'DUMMAY_OBJECT' should be 'DUMMY_OBJECT'.
private static final Object DUMMAY_OBJECT = new Object(); | |
private static final Object DUMMY_OBJECT = new Object(); |
Copilot uses AI. Check for mistakes.
|
||
public static StreamSetObjectRangeIndex getInstance() { | ||
if (instance == null) { | ||
synchronized (NodeRangeIndexCache.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing on NodeRangeIndexCache.class is incorrect for StreamSetObjectRangeIndex singleton initialization. This should synchronize on StreamSetObjectRangeIndex.class instead.
synchronized (NodeRangeIndexCache.class) { | |
synchronized (StreamSetObjectRangeIndex.class) { |
Copilot uses AI. Check for mistakes.
|
||
public void touch(Long streamId) { | ||
try { | ||
expireCache.get(streamId, () -> DUMMAY_OBJECT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference to misspelled constant 'DUMMAY_OBJECT' should be 'DUMMY_OBJECT'.
expireCache.get(streamId, () -> DUMMAY_OBJECT); | |
expireCache.get(streamId, () -> DUMMY_OBJECT); |
Copilot uses AI. Check for mistakes.
} | ||
|
||
withLock(streamId, () -> { | ||
longLongTreeMap.remove(startOffset, objectId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TreeMap.remove(key, value) method removes the mapping only if the key maps to the specified value. This should likely be just remove(startOffset) to remove the mapping for the key, regardless of the current value.
longLongTreeMap.remove(startOffset, objectId); | |
longLongTreeMap.remove(startOffset); |
Copilot uses AI. Check for mistakes.
|
||
// update streamBloomFilter | ||
Set<Long> sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | ||
sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The variable name 'sets' is misleading as it contains the difference between two sets (newly added objects). It should be called 'removedObjects' or similar to clarify that these are objects being removed from the bloom filter.
sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | |
Set<Long> removedStreamSetObjectIds = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | |
removedStreamSetObjectIds.forEach(STREAM_ID_BLOOM_FILTER::removeObject); |
Copilot uses AI. Check for mistakes.
// retry all pending tasks | ||
retryPendingTasks(); | ||
this.indexCache.asyncPrune(this::getStreamSetObjectIds); | ||
this.indexCache.asyncPrune(() -> streamSetObjectIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda captures the local variable 'streamSetObjectIds' which refers to the old set of objects. This should capture 'this.streamSetObjectIds' to use the updated set of stream set object IDs.
this.indexCache.asyncPrune(() -> streamSetObjectIds); | |
this.indexCache.asyncPrune(() -> this.streamSetObjectIds); |
Copilot uses AI. Check for mistakes.
add
StreamIdBloomFilter
, help search objectId -> streamIdadd
StreamSetObjectRangeIndex
help search (streamId, startOffset) -> objectIdwhen read sso from ObjectReader update the
StreamSetObjectRangeIndex
andStreamIdBloomFilter
getObjects support load the streamsetobjects by steps (5 sso each time)
support preload sso to help index not found case.
add
S3StreamsMetadataImageTest
which generate the image and compare if getObjects return right result.